package com.xiaofan

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Duration
import java.util
import java.util.Map
import scala.collection.mutable.ListBuffer


/**
 * 定义数据输入样例类
 */
case class ApacheLogEvent(ip: String, userId: String, timestamp: Long, method: String, url: String)


/**
 * 定义窗口聚合结果样例类
 */
case class PageViewCount(url: String, windowEnd: Long, count: Long)

/**
 * 实时热门页面流量统计
 *
 */
object HotPageNetworkFlow {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

    //    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
    val inputStream: DataStream[String] = env.socketTextStream("192.168.1.27", 7777)

    val dataStream: DataStream[ApacheLogEvent] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(" ")
          val ts: Long = dateFormat.parse(arr(3)).getTime
          ApacheLogEvent(arr(0), arr(1), ts, arr(5), arr(6))
        }
      }
      // 水印参考链接： https://my.oschina.net/u/4596020/blog/4437979
      // -target:jvm-1.8
      .assignTimestampsAndWatermarks {
        WatermarkStrategy
          .forBoundedOutOfOrderness[ApacheLogEvent](Duration.ofSeconds(1))
          .withTimestampAssigner(new SerializableTimestampAssigner[ApacheLogEvent] {
            override def extractTimestamp(element: ApacheLogEvent, recordTimestamp: Long): Long = element.timestamp
          })
      }

    // 进行开窗聚合
    val aggStream: DataStream[PageViewCount] = dataStream
      .filter(_.method == "GET")
      .keyBy(_.url)
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(new OutputTag[ApacheLogEvent]("late-network-flow"))
      .aggregate(new PageCountAgg, new PageViewWindowResult)

    dataStream.print("data")
    aggStream.print("agg")

    val resultStream: DataStream[String] = aggStream
      .keyBy(_.windowEnd)
      .process(new TopNHotPages(3))

    resultStream.print("resultStream")
    // 注意：滑动窗口要在所有的窗口都迟到才会测输入到迟到的流当中
    resultStream.getSideOutput(new OutputTag[ApacheLogEvent]("late-network-flow")).print("late")

    env.execute("hot page flow test")

  }
}


/**
 * 自定义预聚合函数AggregationFunction，聚合状态就是当前url的count值
 */
class PageCountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {

  override def createAccumulator(): Long = 0L

  // 每来一条数据调用一次add，count值加1
  override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  // 在session window 中用于窗口合并
  override def merge(a: Long, b: Long): Long = a + b
}


/**
 * 自定义窗口函数WindowFunction
 */
class PageViewWindowResult() extends WindowFunction[Long, PageViewCount, String, TimeWindow] {

  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {
    out.collect(PageViewCount(key, window.getEnd, input.iterator.next()))
  }
}


/**
 * 自定义KeyedProcessFunction
 */
class TopNHotPages(topSize: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {

  private var pageViewCountMapState: MapState[String, Long] = _

  override def open(parameters: Configuration): Unit = {
    pageViewCountMapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pageViewCount-list", classOf[String], classOf[Long]))
  }

  override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {
    // 更新状态
    pageViewCountMapState.put(value.url, value.count)

    // 注册一个windowEnd + 1 之后触发的定时器
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
    // 另外注册一个定时器，1分钟之后触发，这是窗口已经彻底关闭，不再有聚合结果输出，可以清空状态
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 60000L)

  }

  // 当定时器触发，可以认为所有的窗口统计结果都已到齐，可以排序输出了
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

    // 判断定时器触发时间，如果已经是窗口结束时间1分钟之后，那么直接清空状态
    if (timestamp == ctx.getCurrentKey + 60000L) {
      pageViewCountMapState.clear()
      return
    }

    val allPageViewCounts: ListBuffer[(String, Long)] = ListBuffer()

    val iter: util.Iterator[Map.Entry[String, Long]] = pageViewCountMapState.entries().iterator()

    while (iter.hasNext) {
      // 注意： 这里iter.next()的用法
      val value: Map.Entry[String, Long] = iter.next()
      allPageViewCounts += ((value.getKey, value.getValue))
    }

    // 按照count大小排序, 取前n个
    val sortedItemViewCounts: ListBuffer[(String, Long)] = allPageViewCounts.sortWith(_._2 > _._2).take(topSize)

    // 将排名信息格式化成字符串，便于打印输出可视化展示
    val result: StringBuilder = new StringBuilder
    result.append("窗口结束时间：").append(new Timestamp(timestamp - 1)).append("\n")

    // 遍历结果列表中的每个ItemViewCount 输出到一行
    for (i <- sortedItemViewCounts.indices) {
      val currentPageViewCount = sortedItemViewCounts(i)
      result.append("NO").append(i + 1).append(": \t")
        .append("URL = ").append(currentPageViewCount._1).append("\t")
        .append("热门度 = ").append(currentPageViewCount._2).append("\n")
    }

    result.append("===========================================\n\n")

    out.collect(result.toString())

    Thread.sleep(1000)

  }
}
